import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.QueryableStateClient;

import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;

public class QueryState
{
    public static void main(String[] args) throws UnknownHostException, InterruptedException,Exception
    {
        QueryableStateClient client = new QueryableStateClient
                (
                "Desktop", // taskmanager的地址
                9069);// 默认是9069端口，可以在flink-conf.yaml文件中配置
//        这个和8081端口不是一个意思，不要搞混




        // the state descriptor of the state to be fetched.
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>
                        ("average",
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));





        while (true)
        {

//            System.out.println("－－－－进入while－－－－");

            CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
                    client.getKvState(
                            JobID.fromHexString("528c7909a304fe1922034b859e44b759"), // 从FLINK WEB UI中获取Job ID
                            "query123", // wordcount中设置的名字 descriptor.setQueryable("query123");
                            1L, // key的值
                            BasicTypeInfo.LONG_TYPE_INFO, // key 的类型
                            descriptor);


            // now handle the returned value
            resultFuture.thenAccept(response ->
            {
                try
                {
                    Tuple2<Long, Long> res = response.value();
                    System.out.println("res: " + res);
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
            });


            Thread.sleep(1000);
        }




    }
}


//这个代码来自:
//https://blog.csdn.net/whr_yy/article/details/100535945
//代码大意为了读取已经保存的checkpoint中的内容
